/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.job.reindex.configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;

import javax.sql.DataSource;

import org.apache.commons.lang3.tuple.Triple;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.StepLocator;
import org.springframework.batch.integration.partition.BeanFactoryStepLocator;
import org.springframework.batch.integration.partition.StepExecutionRequest;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.unidata.mdm.core.configuration.job.ModularBatchJobSupportFactoryBean;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.service.impl.PositiveValueJobParameterValidator;
import org.unidata.mdm.core.service.job.JobFairPartitionHandler;
import org.unidata.mdm.core.service.job.JobParameterValidator;
import org.unidata.mdm.core.service.job.JobStepExecutionRequestHandler;
import org.unidata.mdm.core.service.job.JobTargetedPartitionHandler;
import org.unidata.mdm.core.service.job.JobTemplateParameters;
import org.unidata.mdm.core.type.job.ModularBatchJobStepExecutionListener;
import org.unidata.mdm.core.type.keys.LSN;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobDataItemProcessor;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobDataItemReader;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobDataItemWriter;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobDataPartitioner;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobDataStepExecutionListener;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobListener;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobMappingItemReader;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobMappingItemWriter;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobMappingPartitioner;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobPrepareItemWriter;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobPreparePartitioner;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobResetItemWriter;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobResetModelItemReader;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobResetModelItemWriter;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobResetModelPartitioner;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobResetPartitioner;
import org.unidata.mdm.job.reindex.service.impl.ReindexDataJobTypesEnumParamExtractor;
import org.unidata.mdm.system.configuration.AbstractConfiguration;
import org.unidata.mdm.system.type.runtime.MeasurementContextName;

import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.topic.ITopic;

/**
 * @author Mikhail Mikhailov on Dec 17, 2019
 */
@Configuration
public class ReindexJobConfiguration extends AbstractConfiguration {

    /**
     * The id.
     */
    private static final ConfigurationId ID = () -> "REINDEX_JOB_CONFIGURATION";

    @Autowired
    private HazelcastInstance hazelcastInstance;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    /**
     * {@inheritDoc}
     */
    @Override
    protected ConfigurationId getId() {
        return ID;
    }

    @Bean
    public ModularBatchJobSupportFactoryBean reindexBatchSetPostProcessors() {
        ModularBatchJobSupportFactoryBean fb = new ModularBatchJobSupportFactoryBean();
        fb.setJobName(ReindexJobConfigurationConstants.JOB_NAME);
        return fb;
    }

    @Bean
    public StepLocator stepLocator(ApplicationContext applicationContext) {
        BeanFactoryStepLocator bfsl = new BeanFactoryStepLocator();
        bfsl.setBeanFactory(applicationContext);
        return bfsl;
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_QUEUE)
    public IQueue<Message<StepExecutionRequest>> reindexDataQueue() {
        return hazelcastInstance.getQueue(ReindexJobConfigurationConstants.BEAN_NAME_DATA_QUEUE);
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MODEL_TOPIC)
    public ITopic<Message<StepExecutionRequest>> reindexMetaTopic() {
        return hazelcastInstance.getTopic(ReindexJobConfigurationConstants.BEAN_NAME_MODEL_TOPIC);
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPPING_EXECUTOR)
    public SimpleAsyncTaskExecutor
        mappingAsyncTaskExecutor(@Value("${" + ReindexJobConfigurationConstants.PROP_NAME_MAPPING_CONCURRENCY + ":4}") int concurrency) {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(ReindexJobConfigurationConstants.BEAN_NAME_MAPPPING_EXECUTOR + "-");
        executor.setConcurrencyLimit(concurrency);
        return executor;
    }

    @Bean
    public ReindexDataJobTypesEnumParamExtractor reindexDataJobTypesEnumParamExtractor(@Autowired MetaModelService metaModelService) {
        return new ReindexDataJobTypesEnumParamExtractor(metaModelService);
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_PARAMETERS_TEMPLATE)
    public JobTemplateParameters templateParameters(
            @Autowired PositiveValueJobParameterValidator validator,
            @Autowired ReindexDataJobTypesEnumParamExtractor extractor) {

        Map<String, Object> params = new HashMap<>();
        params.put("jobUser", "");
        params.put("reindexTypes", extractor);
        params.put("usersSelector", "");
        params.put("forceModelRefresh", Boolean.FALSE);
        params.put("updateMappings", Boolean.FALSE);
        params.put("cleanIndexes", Boolean.FALSE);
        params.put("blockSize", Long.valueOf(1000L));
        params.put("reindexRecords", Boolean.TRUE);
        params.put("reindexRelations", Boolean.TRUE);
        params.put("reindexClassifiers", Boolean.TRUE);
        params.put("reindexMatching", Boolean.TRUE);
        params.put("skipDq", Boolean.TRUE);
        params.put("skipConsistencyCheck", Boolean.TRUE);
        params.put("skipNotifications", Boolean.TRUE);
        params.put("filters", "");
        params.put("operationId", "");
        params.put("writeIdLog", Boolean.FALSE);
        params.put("processIdLog", Boolean.FALSE);

        Map<String, JobParameterValidator> validators = new HashMap<>();
        validators.put("blockSize", validator);

        JobTemplateParameters jtp = new JobTemplateParameters(ReindexJobConfigurationConstants.JOB_NAME);
        jtp.setValueMap(params);
        jtp.setValidators(validators);

        return jtp;
    }

    // -------------------------------------------------------
    // LISTENERS
    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_JOB_EXEC_LISTENER)
    @JobScope
    public JobExecutionListener reindexDataJobListener(
            @Value("${" + ReindexJobConfigurationConstants.PROP_NAME_JOB_DESCRIPTION_CODE + ":Reindex job}") String descriptionCode) {
        ReindexDataJobListener l = new ReindexDataJobListener();
        l.setJobDescription(descriptionCode);
        return l;
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_STEP_EXEC_LISTENER)
    @StepScope
    public StepExecutionListener reindexDataJobDataStepExecutionListener() {
        ReindexDataJobDataStepExecutionListener l = new ReindexDataJobDataStepExecutionListener();
        l.setStepName(ReindexJobConfigurationConstants.BEAN_NAME_DATA_STEP);
        l.setContextName(MeasurementContextName.MEASURE_STEP_REINDEX);
        return l;
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_STEP_EXEC_LISTENER)
    @StepScope
    public StepExecutionListener reindexDataJobMappingStepExecutionListener() {
        ModularBatchJobStepExecutionListener l = new ModularBatchJobStepExecutionListener();
        l.setStepName(ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_STEP);
        return l;
    }

    // END OF LISTENERS
    // -------------------------------------------------------
    // STEP EXECUTOR
    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_STEP_EXEC_HANDLER)
    public JobStepExecutionRequestHandler reindexRemoteStepExecutionRequestHandler(
            JobExplorer jobExplorer,
            JobRepository jobRepository,
            StepLocator jobStepLocator,
            @Qualifier(ReindexJobConfigurationConstants.BEAN_NAME_DATA_QUEUE) IQueue<Message<StepExecutionRequest>> reindexQueue,
            @Qualifier(ReindexJobConfigurationConstants.BEAN_NAME_MODEL_TOPIC) ITopic<Message<StepExecutionRequest>> reindexTopic,
            @Value("${" + ReindexJobConfigurationConstants.PROP_NAME_DATA_REINDEX_THREADS + ":2}") int concurrency) {

        JobStepExecutionRequestHandler handler = new JobStepExecutionRequestHandler();
        handler.setJobExplorer(jobExplorer);
        handler.setJobRepository(jobRepository);
        handler.setJobName(ReindexJobConfigurationConstants.JOB_NAME);
        handler.setStepLocator(jobStepLocator);
        handler.setQueue(reindexQueue);
        handler.setTopic(reindexTopic);
        handler.setThreadCount(concurrency);
        return handler;
    }
    // END OF STEP EXECUTOR
    // -------------------------------------------------------
    // DATA ITEMS
    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_PARTITION_HANDLER)
    public PartitionHandler reindexDataJobDataPartitionHandler(
            @Qualifier("coreDataSource") DataSource coreDataSource,
            JobExplorer jobExplorer,
            @Qualifier(ReindexJobConfigurationConstants.BEAN_NAME_DATA_QUEUE) BlockingQueue<Message<StepExecutionRequest>> reindexDataJobDataQueue) {

        JobFairPartitionHandler handler = new JobFairPartitionHandler();
        handler.setStepName(ReindexJobConfigurationConstants.BEAN_NAME_DATA_STEP);
        handler.setDataSource(coreDataSource);
        handler.setJobExplorer(jobExplorer);
        handler.setGridSize(8);
        handler.setMessagingGateway(reindexDataJobDataQueue);
        handler.setGrouped(false);
        return handler;
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_PARTITIONER)
    @JobScope
    public Partitioner reindexDataJobDataPartitioner() {
        return new ReindexDataJobDataPartitioner();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_ITEM_READER)
    @StepScope
    public ReindexDataJobDataItemReader reindexDataJobDataItemReader() {
        return new ReindexDataJobDataItemReader();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_ITEM_PROCESSOR)
    @StepScope
    public ReindexDataJobDataItemProcessor reindexDataJobDataItemProcessor() {
        return new ReindexDataJobDataItemProcessor();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_ITEM_WRITER)
    @StepScope
    public ReindexDataJobDataItemWriter reindexDataJobDataItemWriter() {
        return new ReindexDataJobDataItemWriter();
    }
    // END OF DATA ITEM HANDLERS
    // -------------------------------------------------------
    // MAPPING ITEM HANDLERS
    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_PREPARE_PARTITIONER)
    @JobScope
    public Partitioner reindexDataJobPreparePartitioner() {
        return new ReindexDataJobPreparePartitioner();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_PARTITIONER)
    @JobScope
    public Partitioner reindexDataJobResetPartitioner() {
        return new ReindexDataJobResetPartitioner();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_PARTITIONER)
    @JobScope
    public Partitioner reindexDataJobMappingPartitioner() {
        return new ReindexDataJobMappingPartitioner();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_ITEM_READER)
    @StepScope
    public ItemReader<String> reindexDataJobMappingItemReader() {
        return new ReindexDataJobMappingItemReader();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_ITEM_PROCESSOR)
    @StepScope
    public ItemProcessor<String, String> reindexDataJobMappingItemProcessor() {
        return new PassThroughItemProcessor<>();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_ITEM_WRITER)
    @StepScope
    public ItemWriter<String> reindexDataJobMappingItemWriter() {
        return new ReindexDataJobMappingItemWriter();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_PREPARE_ITEM_WRITER)
    @StepScope
    public ItemWriter<String> reindexDataJobPrepareItemWriter() {
        return new ReindexDataJobPrepareItemWriter();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_ITEM_WRITER)
    @StepScope
    public ItemWriter<String> reindexDataJobResetItemWriter() {
        return new ReindexDataJobResetItemWriter();
    }

    // END OF MAPPING ITEM HANDLERS
    // -------------------------------------------------------
    // RESET MODEL ITEM HANDLERS
    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_PARTITION_HANDLER)
    public PartitionHandler reindexDataJobResetModelPartitionHandler(
            @Qualifier("coreDataSource") DataSource coreDataSource,
            JobExplorer jobExplorer,
            @Qualifier(ReindexJobConfigurationConstants.BEAN_NAME_MODEL_TOPIC) ITopic<Message<StepExecutionRequest>> reindexDataJobResetModelTopic) {

        JobTargetedPartitionHandler handler = new JobTargetedPartitionHandler();
        handler.setStepName(ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_STEP);
        handler.setDataSource(coreDataSource);
        handler.setJobExplorer(jobExplorer);
        handler.setGridSize(8);
        handler.setMessagingGateway(reindexDataJobResetModelTopic);
        return handler;
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_PARTITIONER)
    @JobScope
    public Partitioner reindexDataJobResetModelPartitioner() {
        return new ReindexDataJobResetModelPartitioner();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_ITEM_READER)
    @StepScope
    public ItemReader<Boolean> reindexDataJobResetModelItemReader() {
        return new ReindexDataJobResetModelItemReader();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_ITEM_PROCESSOR)
    @StepScope
    public ItemProcessor<Boolean, Boolean> reindexDataJobResetModelItemProcessor() {
        return new PassThroughItemProcessor<>();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_ITEM_WRITER)
    @StepScope
    public ItemWriter<Boolean> reindexDataJobResetModelItemWriter() {
        return new ReindexDataJobResetModelItemWriter();
    }
    // END OF RESET MODEL ITEM HANDLERS
    // -------------------------------------------------------
    // STEPS
    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_STEP)
    public Step reindexDataJobResetModelStep(
            ItemReader<Boolean> reindexDataJobResetModelItemReader,
            ItemProcessor<Boolean, Boolean> reindexDataJobResetModelItemProcessor,
            ItemWriter<Boolean> reindexDataJobResetModelItemWriter) {

        return stepBuilderFactory.get(ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_STEP)
                .<Boolean, Boolean>chunk(1)
                .reader(reindexDataJobResetModelItemReader)
                .processor(reindexDataJobResetModelItemProcessor)
                .writer(reindexDataJobResetModelItemWriter)
                .build();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_STEP)
    public Step reindexDataJobMappingStep(
            StepExecutionListener reindexDataJobMappingStepExecutionListener,
            ItemReader<String> reindexDataJobMappingItemReader,
            ItemProcessor<String, String> reindexDataJobMappingItemProcessor,
            ItemWriter<String> reindexDataJobMappingItemWriter) {

        return stepBuilderFactory.get(ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_STEP)
                .listener(reindexDataJobMappingStepExecutionListener)
                .<String, String>chunk(1)
                .reader(reindexDataJobMappingItemReader)
                .processor(reindexDataJobMappingItemProcessor)
                .writer(reindexDataJobMappingItemWriter)
                .build();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_PREPARE_STEP)
    public Step reindexDataJobPrepareStep(
            ItemReader<String> reindexDataJobMappingItemReader,
            ItemProcessor<String, String> reindexDataJobMappingItemProcessor,
            ItemWriter<String> reindexDataJobPrepareItemWriter) {

        return stepBuilderFactory.get(ReindexJobConfigurationConstants.BEAN_NAME_PREPARE_STEP)
                .<String, String>chunk(1)
                .reader(reindexDataJobMappingItemReader)
                .processor(reindexDataJobMappingItemProcessor)
                .writer(reindexDataJobPrepareItemWriter)
                .build();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_DATA_STEP)
    public Step reindexDataJobDataStep(
            @Value("${" + ReindexJobConfigurationConstants.PROP_NAME_DATA_COMMIT_INTERVAL + ":1000}") int commitInterval,
            StepExecutionListener reindexDataJobDataStepExecutionListener,
            ReindexDataJobDataItemReader reindexDataJobDataItemReader,
            ReindexDataJobDataItemProcessor reindexDataJobDataItemProcessor,
            ReindexDataJobDataItemWriter reindexDataJobDataItemWriter) {

        return stepBuilderFactory.get(ReindexJobConfigurationConstants.BEAN_NAME_DATA_STEP)
                .listener(reindexDataJobDataStepExecutionListener)
                .<Triple<LSN, UUID, String>, UpsertRequestContext>chunk(commitInterval)
                .reader(reindexDataJobDataItemReader)
                .processor(reindexDataJobDataItemProcessor)
                .writer(reindexDataJobDataItemWriter)
                .build();
    }

    @Bean(name = ReindexJobConfigurationConstants.BEAN_NAME_RESET_STEP)
    public Step reindexDataJobResetStep(
            ItemReader<String> reindexDataJobMappingItemReader,
            ItemProcessor<String, String> reindexDataJobMappingItemProcessor,
            ItemWriter<String> reindexDataJobResetItemWriter) {

        return stepBuilderFactory.get(ReindexJobConfigurationConstants.BEAN_NAME_RESET_STEP)
                .<String, String>chunk(1)
                .reader(reindexDataJobMappingItemReader)
                .processor(reindexDataJobMappingItemProcessor)
                .writer(reindexDataJobResetItemWriter)
                .build();
    }
    // END OF STEPS
    // -------------------------------------------------------
    @Bean(name = ReindexJobConfigurationConstants.JOB_NAME)
    public Job reindexDataJob(
            @Autowired JobRepository jobRepository,
            @Autowired JobExecutionListener reindexDataJobListener,
            @Autowired JobBuilderFactory jobBuilderFactory,
            @Autowired TaskExecutor reindexDataJobMappingTaskExecutor,
            // Reset model, if requested
            @Autowired Partitioner reindexDataJobResetModelPartitioner,
            @Autowired PartitionHandler reindexDataJobResetModelPartitionHandler,
            // Ensure mappings
            @Autowired Partitioner reindexDataJobMappingPartitioner,
            @Autowired Step reindexDataJobMappingStep,
            // Prepare indexes
            @Autowired Partitioner reindexDataJobPreparePartitioner,
            @Autowired Step reindexDataJobPrepareStep,
            // Process data
            @Autowired Partitioner reindexDataJobDataPartitioner,
            @Autowired PartitionHandler reindexDataJobDataPartitionHandler,
            // Reset indexes
            @Autowired Partitioner reindexDataJobResetPartitioner,
            @Autowired Step reindexDataJobResetStep) {

        return jobBuilderFactory.get(ReindexJobConfigurationConstants.JOB_NAME)
            .repository(jobRepository)
            .preventRestart()
            .listener(reindexDataJobListener)
            // 1. Force model refresh, if requested.
            .start(stepBuilderFactory.get("reindexDataJobResetModel")
                    .partitioner(ReindexJobConfigurationConstants.BEAN_NAME_RESET_MODEL_STEP, reindexDataJobResetModelPartitioner)
                    .partitionHandler(reindexDataJobResetModelPartitionHandler)
                    .build())
            // 2. Ensure mappings exist. Create, if necessary.
            .next(stepBuilderFactory.get("reindexDataJobMapping")
                    .partitioner(ReindexJobConfigurationConstants.BEAN_NAME_MAPPING_STEP, reindexDataJobMappingPartitioner)
                    .step(reindexDataJobMappingStep)
                    .taskExecutor(reindexDataJobMappingTaskExecutor)
                    .build())
            // 3. Apply various index settings (throttling, refresh interval, etc.)
            .next(stepBuilderFactory.get("reindexDataJobPrepare")
                    .partitioner(ReindexJobConfigurationConstants.BEAN_NAME_PREPARE_STEP, reindexDataJobPreparePartitioner)
                    .step(reindexDataJobPrepareStep)
                    .taskExecutor(reindexDataJobMappingTaskExecutor)
                    .build())
            // 4. Reindex data actually
            .next(stepBuilderFactory.get("reindexDataJobData")
                    .partitioner(ReindexJobConfigurationConstants.BEAN_NAME_DATA_STEP, reindexDataJobDataPartitioner)
                    .partitionHandler(reindexDataJobDataPartitionHandler)
                    .build())
            // 5. Put indexes back into normal state (reset throttling, refresh interval, etc.)
            .next(stepBuilderFactory.get("reindexDataJobReset")
                    .partitioner(ReindexJobConfigurationConstants.BEAN_NAME_RESET_STEP, reindexDataJobResetPartitioner)
                    .step(reindexDataJobResetStep)
                    .taskExecutor(reindexDataJobMappingTaskExecutor)
                    .build())
            .build();
    }
}
